package compactor

import (
	"context"
	"fmt"
	"os"
	"path/filepath"
	"sync"
	"unsafe"

	"github.com/go-kit/log"
	"github.com/go-kit/log/level"
	"github.com/grafana/dskit/concurrency"
	"github.com/prometheus/common/model"

	"github.com/grafana/loki/v3/pkg/compactor/deletion"
	"github.com/grafana/loki/v3/pkg/compactor/retention"
	"github.com/grafana/loki/v3/pkg/logproto"
	"github.com/grafana/loki/v3/pkg/logql/syntax"
	chunk_util "github.com/grafana/loki/v3/pkg/storage/chunk/client/util"
	"github.com/grafana/loki/v3/pkg/storage/config"
	"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/storage"
	util_log "github.com/grafana/loki/v3/pkg/util/log"
)

const (
	gzipExtension = ".gz"
)

var errFileCountNotOne = fmt.Errorf("can't apply retention or index updates when index file count is not one")

type tableExpirationChecker interface {
	IntervalMayHaveExpiredChunks(interval model.Interval, userID string) bool
}

type IndexCompactor interface {
	// NewTableCompactor returns a new TableCompactor for compacting a table.
	// commonIndexSet refers to common index files or in other words multi-tenant index.
	// existingUserIndexSet refers to existing user specific index files in the storage.
	// makeEmptyUserIndexSetFunc can be used for creating an empty indexSet for a user
	// who does not have an index for it in existingUserIndexSet.
	// periodConfig holds the PeriodConfig for the table.
	NewTableCompactor(
		ctx context.Context,
		commonIndexSet IndexSet,
		existingUserIndexSet map[string]IndexSet,
		makeEmptyUserIndexSetFunc MakeEmptyUserIndexSetFunc,
		periodConfig config.PeriodConfig,
	) TableCompactor

	// OpenCompactedIndexFile opens a compressed index file at given path.
	OpenCompactedIndexFile(
		ctx context.Context,
		path,
		tableName,
		userID,
		workingDir string,
		periodConfig config.PeriodConfig,
		logger log.Logger,
	) (
		CompactedIndex,
		error,
	)
}

type TableCompactor interface {
	// CompactTable compacts the table.
	// After compaction is done successfully, it should set the new/updated CompactedIndex for relevant IndexSets.
	CompactTable() (err error)
}

type Chunk interface {
	GetFrom() model.Time
	GetThrough() model.Time
	GetFingerprint() uint64
	GetChecksum() uint32
	GetSize() uint32
	GetEntriesCount() uint32
}

type MakeEmptyUserIndexSetFunc func(userID string) (IndexSet, error)

type table struct {
	name               string
	workingDirectory   string
	uploadConcurrency  int
	indexStorageClient storage.Client
	indexCompactor     IndexCompactor
	tableMarker        retention.TableMarker
	expirationChecker  tableExpirationChecker
	periodConfig       config.PeriodConfig

	baseUserIndexSet, baseCommonIndexSet storage.IndexSet

	indexSets             map[string]*indexSet
	usersWithPerUserIndex []string
	logger                log.Logger

	ctx context.Context
}

func newTable(ctx context.Context, workingDirectory string, indexStorageClient storage.Client,
	indexCompactor IndexCompactor, periodConfig config.PeriodConfig,
	tableMarker retention.TableMarker, expirationChecker tableExpirationChecker,
	uploadConcurrency int,
) (*table, error) {
	err := chunk_util.EnsureDirectory(workingDirectory)
	if err != nil {
		return nil, err
	}

	table := table{
		ctx:                ctx,
		name:               filepath.Base(workingDirectory),
		workingDirectory:   workingDirectory,
		indexStorageClient: indexStorageClient,
		indexCompactor:     indexCompactor,
		tableMarker:        tableMarker,
		expirationChecker:  expirationChecker,
		periodConfig:       periodConfig,
		indexSets:          map[string]*indexSet{},
		baseUserIndexSet:   storage.NewIndexSet(indexStorageClient, true),
		baseCommonIndexSet: storage.NewIndexSet(indexStorageClient, false),
		uploadConcurrency:  uploadConcurrency,
	}
	table.logger = log.With(util_log.Logger, "table-name", table.name)

	return &table, nil
}

func (t *table) compact() error {
	t.indexStorageClient.RefreshIndexTableCache(t.ctx, t.name)
	indexFiles, usersWithPerUserIndex, err := t.indexStorageClient.ListFiles(t.ctx, t.name, false)
	if err != nil {
		return err
	}

	if len(indexFiles) == 0 && len(usersWithPerUserIndex) == 0 {
		level.Info(t.logger).Log("msg", "no common index files and user index found")
		return nil
	}

	t.usersWithPerUserIndex = usersWithPerUserIndex

	level.Info(t.logger).Log("msg", "listed files", "count", len(indexFiles))

	t.indexSets[""], err = newCommonIndexSet(t.ctx, t.name, t.baseCommonIndexSet, t.workingDirectory, t.logger)
	if err != nil {
		return err
	}

	// userIndexSets is just for passing it to NewTableCompactor since go considers map[string]*indexSet different type than map[string]IndexSet
	userIndexSets := make(map[string]IndexSet, len(t.usersWithPerUserIndex))

	for _, userID := range t.usersWithPerUserIndex {
		var err error
		t.indexSets[userID], err = newUserIndexSet(t.ctx, t.name, userID, t.baseUserIndexSet, filepath.Join(t.workingDirectory, userID), t.logger)
		if err != nil {
			return err
		}
		userIndexSets[userID] = t.indexSets[userID]
	}

	// protect indexSets with mutex so that we are concurrency safe if the TableCompactor calls MakeEmptyUserIndexSetFunc concurrently
	indexSetsMtx := sync.Mutex{}
	tableCompactor := t.indexCompactor.NewTableCompactor(t.ctx, t.indexSets[""], userIndexSets, func(userID string) (IndexSet, error) {
		indexSetsMtx.Lock()
		defer indexSetsMtx.Unlock()

		indexSet, err := newUserIndexSet(t.ctx, t.name, userID, t.baseUserIndexSet, filepath.Join(t.workingDirectory, userID), t.logger)
		if err != nil {
			return nil, err
		}
		t.indexSets[userID] = indexSet
		return indexSet, nil
	}, t.periodConfig)

	return tableCompactor.CompactTable()
}

// done takes care of uploading the index to the object storage and removing any source index files that were compacted away.
// No index updates must be done after calling this method.
func (t *table) done() error {
	userIDs := make([]string, 0, len(t.indexSets))
	for userID := range t.indexSets {
		// indexSet.done() uploads the compacted db and cleans up the source index files.
		// For user index sets, the files from common index sets are also a source of index.
		// if we cleanup common index sets first, and we fail to upload newly compacted dbs in user index sets, then we will lose data.
		// To avoid any data loss, we should call done() on common index sets at the end.
		if userID == "" {
			continue
		}

		userIDs = append(userIDs, userID)
	}

	err := concurrency.ForEachJob(t.ctx, len(userIDs), t.uploadConcurrency, func(_ context.Context, idx int) error {
		return t.indexSets[userIDs[idx]].done()
	})
	if err != nil {
		return err
	}

	if commonIndexSet, ok := t.indexSets[""]; ok {
		if err := commonIndexSet.done(); err != nil {
			return err
		}
	}

	return nil
}

// applyRetention applies retention on the index sets
func (t *table) applyRetention() error {
	tableInterval := retention.ExtractIntervalFromTableName(t.name)
	// call runRetention on the index sets which may have expired chunks
	for userID, is := range t.indexSets {
		// Make sure we do not apply retention on common index set when one of the following is true:
		// 1. It got compacted away to the per-user indexes.
		// 2. There are no common index files.
		if userID == "" && is.compactedIndex == nil && ((is.removeSourceObjects && !is.uploadCompactedDB) || len(is.ListSourceFiles()) == 0) {
			continue
		}

		if !t.expirationChecker.IntervalMayHaveExpiredChunks(tableInterval, userID) {
			continue
		}

		// compactedIndex is only set in indexSet when files have been compacted,
		// so we need to open the compacted index file for applying retention if compactedIndex is nil
		if is.compactedIndex == nil {
			if err := t.openCompactedIndexForUpdates(is); err != nil {
				return err
			}
		}

		err := is.runRetention(t.tableMarker)
		if err != nil {
			return err
		}
	}

	return nil
}

func (t *table) openCompactedIndexForUpdates(idxSet *indexSet) error {
	sourceFiles := idxSet.ListSourceFiles()
	if len(sourceFiles) != 1 {
		return errFileCountNotOne
	}

	downloadedAt, err := idxSet.GetSourceFile(sourceFiles[0])
	if err != nil {
		return err
	}

	compactedIndexFile, err := t.indexCompactor.OpenCompactedIndexFile(t.ctx, downloadedAt, t.name, idxSet.userID, filepath.Join(t.workingDirectory, idxSet.userID), t.periodConfig, idxSet.logger)
	if err != nil {
		return err
	}

	idxSet.setCompactedIndex(compactedIndexFile, false, false)

	return nil
}

// applyStorageUpdates applies storage updates for a single stream of a user
func (t *table) applyStorageUpdates(userID, labelsStr string, rebuiltChunks map[string]deletion.Chunk, chunksToDeIndex []string) error {
	labels, err := syntax.ParseLabels(labelsStr)
	if err != nil {
		return err
	}

	is, ok := t.indexSets[userID]
	if !ok {
		// Index for the user does not exist, likely removed by retention/deletion without line filter.
		// Mark all the rebuilt chunks for deletion.
		level.Info(util_log.Logger).Log("msg", "user index not found, removing the newly built chunks", "table_name", t.name, "userID", userID)
		chunksToDelete := make([]string, 0, len(rebuiltChunks))
		cfg := config.SchemaConfig{Configs: []config.PeriodConfig{t.periodConfig}}

		for _, newChunk := range rebuiltChunks {
			if newChunk == nil {
				continue
			}
			chunkID := cfg.ExternalKey(logproto.ChunkRef{
				Fingerprint: newChunk.GetFingerprint(),
				UserID:      userID,
				From:        newChunk.GetFrom(),
				Through:     newChunk.GetThrough(),
				Checksum:    newChunk.GetChecksum(),
			})
			chunksToDelete = append(chunksToDelete, chunkID)
		}

		return t.tableMarker.MarkChunksForDeletion(t.name, chunksToDelete)
	}

	// compactedIndex is only set in indexSet when files have been compacted,
	// so we need to open the compacted index file for applying index updates if compactedIndex is nil
	if is.compactedIndex == nil {
		if err := t.openCompactedIndexForUpdates(is); err != nil {
			return err
		}
	}

	chunksNotIndexed, err := is.applyUpdates(labels, rebuiltChunks, chunksToDeIndex)
	if err != nil {
		return err
	}
	// build the list of source chunks to delete
	chunksToDelete := make([]string, 0, len(rebuiltChunks)+len(chunksNotIndexed))
	for chunkID := range rebuiltChunks {
		chunksToDelete = append(chunksToDelete, chunkID)
	}

	// Remove the newly built chunks which were not indexed due to their source chunks missing from the current index.
	// Source chunks could be deleted by retention or delete requests without line filters.
	// However, since storage updates are supposed to be idempotent, see if the chunk was already indexed in previous attempts which also already removed the source chunk.
	cfg := config.SchemaConfig{Configs: []config.PeriodConfig{t.periodConfig}}
	for _, chk := range chunksNotIndexed {
		chunkRef := logproto.ChunkRef{
			Fingerprint: chk.GetFingerprint(),
			UserID:      userID,
			From:        chk.GetFrom(),
			Through:     chk.GetThrough(),
			Checksum:    chk.GetChecksum(),
		}
		chunkExists, err := is.chunkExists(labels, chunkRef)
		if err != nil {
			return err
		}

		if chunkExists {
			continue
		}
		chunkID := cfg.ExternalKey(logproto.ChunkRef{
			Fingerprint: chk.GetFingerprint(),
			UserID:      userID,
			From:        chk.GetFrom(),
			Through:     chk.GetThrough(),
			Checksum:    chk.GetChecksum(),
		})
		chunksToDelete = append(chunksToDelete, chunkID)
	}

	return t.tableMarker.MarkChunksForDeletion(t.name, chunksToDelete)
}

// cleanup takes care of cleaning up any local data on disk
func (t *table) cleanup() {
	for _, is := range t.indexSets {
		is.cleanup()
	}

	if err := os.RemoveAll(t.workingDirectory); err != nil {
		level.Error(t.logger).Log("msg", fmt.Sprintf("failed to remove working directory %s", t.workingDirectory), "err", err)
	}
}

func (t *table) GetUserIndex(userID string) (retention.SeriesIterator, error) {
	is, ok := t.indexSets[userID]
	if !ok {
		return nil, nil
	}

	// compactedIndex is only set in indexSet when files have been compacted,
	// so we need to open the compacted index file for applying index updates if compactedIndex is nil
	if is.compactedIndex == nil {
		if err := t.openCompactedIndexForUpdates(is); err != nil {
			return nil, err
		}
	}

	return is.compactedIndex, nil
}

// tableHasUncompactedIndex returns true if we have more than "1" common index files.
// We are checking for more than "1" because earlier boltdb-shipper index type did not have per tenant index so there would be only common index files.
// In case of per tenant index, it is okay to consider it compacted since having just 1 uncompacted index file for a while should be fine.
func tableHasUncompactedIndex(ctx context.Context, tableName string, indexStorageClient storage.Client) (bool, error) {
	commonIndexFiles, _, err := indexStorageClient.ListFiles(ctx, tableName, false)
	return len(commonIndexFiles) > 1, err
}

func unsafeGetBytes(s string) []byte {
	return unsafe.Slice(unsafe.StringData(s), len(s)) // #nosec G103 -- we know the string is not mutated -- nosemgrep: use-of-unsafe-block
}
